package com.tl.spark.scala


import java.io.ByteArrayInputStream

import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
import org.apache.spark.{SparkConf, SparkContext}
import org.dom4j.{Document, Node}
import org.dom4j.io.SAXReader

import scala.collection.JavaConversions._
import scala.collection.JavaConverters.asScalaBufferConverter

/**
  * @program: spark-test
  * @description:
  * @author: dong.tl
  * @create: 2018-09-12 18:42
  **/
object ScalaReadHbase {
  def main(args: Array[String]): Unit = {

    val tableName = "file_data"
    //Hbase设置
    val conf = HBaseConfiguration.create()
    //设置zooKeeper集群地址，也可以通过将hbase-site.xml导入classpath，但是建议在程序里这样设置
    conf.set(HConstants.ZOOKEEPER_QUORUM, "db01,db02,db03")
    //设置zookeeper连接端口，默认2181
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    //设置读取的表
    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    //设置写入的表
    conf.set(TableOutputFormat.OUTPUT_TABLE, "file_data")


    //创建sparkConf
    val sparkConf = new SparkConf()
    //设置spark的任务名
    sparkConf.setAppName("read and write for hbase ")
    //设置master地址
    sparkConf.setMaster("spark://192.168.173.221:7077")
    sparkConf.setMaster("local[*]")
//      .setJars(Seq("F:\\IdeaProjects\\spark-test\\target\\spark-test-1.0-SNAPSHOT.jar"))
    //创建spark上下文
    val sc = new SparkContext(sparkConf)


    //全量读取hbase表
    val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat]
      , classOf[ImmutableBytesWritable]
      , classOf[Result]
    )


    println("rdd.count:" + rdd.count())


    rdd.filter(
      re => {
        val tp: String = Bytes.toString(re._2.getValue("file_info".getBytes, "type".getBytes))
        tp.equalsIgnoreCase("XML")
      }
    ).foreach { case (rowkey, result) => {
      val rk = Bytes.toString(rowkey.get())
      println("rowkey:" + rk)

      //获取行键
      val key = Bytes.toString(result.getRow)
      //      val navigableMap = result.getFamilyMap(Bytes.toBytes("file_info")).asScala
      //      //循环取列族下所有的字段 值
      //      import scala.collection.JavaConversions._
      //      for (entry <- navigableMap.entrySet) {
      //        if (Bytes.toString(entry.getKey).equals("size"))
      //          System.out.println(Bytes.toString(entry.getKey) + ":" + Bytes.toLong(entry.getValue))
      //        else if (Bytes.toString(entry.getKey).equals("bytes"))
      //          System.out.println(Bytes.toString(entry.getKey) + ":" + entry.getValue)
      //        else
      //          System.out.println(Bytes.toString(entry.getKey) + ":" + Bytes.toString(entry.getValue))
      //      }

      //      println(Bytes.toString(result.getValue("file_info".getBytes, "path".getBytes)))
      //      通过列族和列名获取列
      //      val typenames = Bytes.toString(result.getValue("file_info".getBytes, "bytes".getBytes))
      //      if (key != null && typenames != null) {
      //        println(key + ":" + typenames)
      //      }


      try {
        val reader = new SAXReader
        val doc: Document = reader.read(new ByteArrayInputStream(result.getValue("file_info".getBytes, "bytes".getBytes)))
        //        println(doc.asXML())
        val xpath = doc.createXPath("/business:PatentDocumentAndRelated/business:BibliographicData/business:PublicationReference[@dataFormat='standard']/base:DocumentID/base:WIPOST3Code/text()")
//        xpath.setNamespaceURIs(Collections.singletonMap("business",
//          "http://www.sipo.gov.cn/XMLSchema/business"))
        val nodes = xpath.selectNodes(doc)
        nodes.asScala.foreach(node => {
          println(node.getStringValue)
        })
      } catch {
        case exception: Exception => println(rk+":"+exception.getMessage)
      }


    }
    }
    println("map begin")

    //    //过滤空数据，然后对每一个记录做更新，并转换成写入的格式
    //    val final_rdd= rdd.filter(checkNotEmptyKs).map(forDatas)
    //
    //    //转换后的结果，再次做过滤
    //    val save_rdd=final_rdd.filter(checkNull)
    //
    //    //最终在写回hbase表
    //    save_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration)


    sc.stop()
  }
}
